package com.innovation.ic.sc.base.thread.listener.kafka;

import com.alibaba.fastjson.JSONObject;
import com.innovation.ic.sc.base.handler.sc.ModelHandler;
import com.innovation.ic.sc.base.model.sc.User;
import com.innovation.ic.sc.base.pojo.constant.DatabaseOperationType;
import com.innovation.ic.sc.base.pojo.constant.handler.RedisStorage;
import com.innovation.ic.sc.base.service.ServiceHelper;
import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.List;

/**
 * @desc   用户监听处理线程类
 * @author lishen
 * @time   2023年2月6日16:34:12
 */
public class UserListenHandleThread extends AbstractKafkaThread implements Runnable {

    public UserListenHandleThread(String type, ConsumerRecord<?, ?> consumer, JSONObject dataJsonObject, ModelHandler modelHandler, ServiceHelper serviceHelper) {
        this.type = type;
        this.consumer = consumer;
        this.dataJsonObject = dataJsonObject;
        this.modelHandler = modelHandler;
        this.serviceHelper = serviceHelper;
    }

    @Override
    @SneakyThrows
    public void run() {
        if (type.equals(DatabaseOperationType.INSERT)) {
            // 打印监听到的数据库变动内容
            printConsumerContent(consumer);

            // 向redis存储数据
            String keyAsc = RedisStorage.TABLE + RedisStorage.USER + RedisStorage.CREATE_DATE_ASC;
            String keyDesc = RedisStorage.TABLE + RedisStorage.USER + RedisStorage.CREATE_DATE_DESC;

            User user = modelHandler.jsonObjectToUser(dataJsonObject);
            String value = modelHandler.toXmlStorageString(user);
            double score = System.currentTimeMillis();

            serviceHelper.getRedisManager().zAdd(keyAsc, value, score);
            serviceHelper.getRedisManager().zAdd(keyDesc, value, -score);
        }
        if (type.equals(DatabaseOperationType.DELETE)) {
            // 打印监听到的数据库变动内容
            printConsumerContent(consumer);

            // 从redis中删除数据
            String keyAsc = RedisStorage.TABLE + RedisStorage.USER +
                    RedisStorage.CREATE_DATE_ASC;
            String keyDesc = RedisStorage.TABLE + RedisStorage.USER +
                    RedisStorage.CREATE_DATE_DESC;

            User user = modelHandler.jsonObjectToUser(dataJsonObject);
            String value = modelHandler.toXmlStorageString(user);

            serviceHelper.getRedisManager().zRemove(keyAsc, value);
            serviceHelper.getRedisManager().zRemove(keyDesc, value);
        }
        if (type.equals(DatabaseOperationType.UPDATE)) {
            // 打印监听到的数据库变动内容
            printConsumerContent(consumer);

            // 先删除redis中的数据，再插入数据
            String keyAsc = RedisStorage.TABLE + RedisStorage.USER +
                    RedisStorage.CREATE_DATE_ASC;
            String keyDesc = RedisStorage.TABLE + RedisStorage.USER +
                    RedisStorage.CREATE_DATE_DESC;

            User user = modelHandler.jsonObjectToUser(dataJsonObject);
            User newUser = new User();
            newUser.setId(user.getId());
            String newUserString = modelHandler.toXmlFilterString(newUser);

            List ascList = serviceHelper.getRedisManager().page(keyAsc, newUserString);
            if (null != ascList && ascList.size() > 0) {
                for (int i = 0; i < ascList.size(); i++) {
                    String xmlString = (String) ascList.get(i);
                    serviceHelper.getRedisManager().zRemove(keyAsc, xmlString);
                    String value = modelHandler.toXmlStorageString(user);
                    double score = System.currentTimeMillis();
                    serviceHelper.getRedisManager().zAdd(keyAsc, value, score);
                }
            }

            List descList = serviceHelper.getRedisManager().page(keyDesc, newUserString);
            if (null != descList && descList.size() > 0) {
                for (int i = 0; i < descList.size(); i++) {
                    String xmlString = (String) descList.get(i);
                    serviceHelper.getRedisManager().zRemove(keyDesc, xmlString);
                    String value = modelHandler.toXmlStorageString(user);
                    double score = System.currentTimeMillis();
                    serviceHelper.getRedisManager().zAdd(keyDesc, value, score);
                }
            }
        }
    }

    @Override
    protected void doWork() {

    }
}